46.Python之路 - 实现线程池

Python之路 - 实现线程池 🍀

方式一 🍀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import Queue
import threading


class ThreadPool(object):

def __init__(self, max_num=20):
self.queue = Queue.Queue(max_num)
for i in xrange(max_num):
self.queue.put(threading.Thread)

def get_thread(self):
return self.queue.get()

def add_thread(self):
self.queue.put(threading.Thread)

"""
使用:
pool = ThreadPool(10)

def func(arg, p):
import time
time.sleep(2)
p.add_thread()


for i in range(30):
thread = pool.get_thread()
t = thread(target=func, args=(i, pool))
t.start()
"""

方式二 🍀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = []

def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread()
self.generate_list.append(current_thread)

event = self.q.get()
while event != StopEvent:

func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None

if callback is not None:
try:
callback(success, result)
except Exception as e:
pass

with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:

self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True

while self.generate_list:
self.q.put(StopEvent)

self.q.queue.clear()

@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)

"""
pool = ThreadPool(5)

def callback(status, result):
# status, execute action status
# result, execute action return value
pass

def action(i):
print(i)

for i in range(30):
ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
pool.terminate()
"""

本文标题:46.Python之路 - 实现线程池

文章作者:Jesse

发布时间:2020年06月26日 - 09:06

最后更新:2020年06月30日 - 20:06

原始链接:https://jesse.top/2020/06/26/python/06-Concurrent/Python之路 - 实现线程池/

许可协议: 禁止商业用途 转载请保留原文链接及作者。

坚持原创技术分享,您的支持将鼓励我继续创作!